package com.ustcinfo.ishare.esCURD;

import com.ustcinfo.ishare.utils.Blog;
import com.ustcinfo.ishare.utils.ClassFieldsCollector;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
 * Created by Shinelon on 2017/12/26.
 * 封装 ES 增加的操作
 */
public class ESInsert {
    private static final Logger LOGGER = LoggerFactory.getLogger(ESInsert.class);

    /**
     * 使用 java api 创建 transportClient 客户端 向 join 关系 插入子数据
     * 重点 是 setRouting("1") 和 json 文件中需要指明 parent 关系
     */
    public static void transportClientInsertJoinChildData(TransportClient transportClient) {
        String jsonString = "{\n" +
                "  \"cert_id\": \"1\",\n" +
                "  \"cert_name\": \"056\",\n" +
                "  \"talent_join\": {\n" +
                "    \"name\": \"talents_cert\",\n" +
                "    \"parent\": \"1\"\n" +
                "  }\n" +
                "}";
        transportClient.prepareIndex("mytalent", "_doc", "2")
                .setRouting("1").setSource(jsonString, XContentType.JSON)
                .execute().actionGet();

        transportClient.close();
    }

    /**
     * 使用 restful api 创建 http 客户端 向 join 关系 插入子数据
     * 重点 是 routing("1") 和 json 文件中需要指明 parent 关系
     */
    public static void restfulClientInsertJoinChildData(String[] args) {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("99.12.92.200", 9000, "http")));
        String jsonString = "{\n" +
                "  \"cert_id\": \"1\",\n" +
                "  \"cert_name\": \"056\",\n" +
                "  \"talent_join\": {\n" +
                "    \"name\": \"talents_cert\",\n" +
                "    \"parent\": \"1\"\n" +
                "  }\n" +
                "}";
        IndexRequest request = new IndexRequest(
                "mytalent",
                "_doc",
                "2");
        request.source(jsonString, XContentType.JSON);
        request.routing("1");
        try {
            client.index(request);
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    /**
     * 单条数据插入
     * @param transportClient
     * @param object
     * @param indexName
     * @param type
     * @throws IOException
     */
    public void insertToIndex(TransportClient transportClient,Object object,String indexName,String type) throws IOException {
        IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName);
        IndicesExistsResponse inExistsResponse = transportClient.admin().indices()
                .exists(inExistsRequest).actionGet();
        if (inExistsResponse.isExists()) {
            Map map = new ClassFieldsCollector().getFiledsInfo(object);
            transportClient.prepareIndex(indexName, type).setSource(map).execute().actionGet();
        } else {
            transportClient.admin().indices().prepareCreate(indexName).execute().actionGet();
            Map map = new ClassFieldsCollector().getFiledsInfo(object);
            transportClient.prepareIndex(indexName, type).setSource(map).execute().actionGet();
        }
        System.out.println("插入文档成功！");
    }

    /**
     * 批量插入数据
     * @param transportClient
     * @param objects
     * @param indexName
     * @param type
     * @throws IOException
     */
    public void insertToIndexByBulk(TransportClient transportClient, List<Object> objects, String indexName, String type) throws IOException {
        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
        for (Object object : objects){
            Map map = new ClassFieldsCollector().getFiledsInfo(object);
            bulkRequestBuilder.add(transportClient.prepareIndex(indexName,type).setSource(map));
        }
        bulkRequestBuilder.execute().actionGet();
        System.out.println("批量插入文档成功！");
    }

    /**
     * 增加字段和字段值
     * @param transportClient
     * @param indexName
     * @param type
     * @param id
     * @param map
     */
    public void insertField(TransportClient transportClient,String indexName, String type,String id,Map<String,Object> map){
        IndexRequest indexRequest = new IndexRequest(indexName,type,id).source(map);
        UpdateRequest updateRequest = new UpdateRequest(indexName,type,id).doc(map).upsert(indexRequest);
        boolean isInsertField = transportClient.update(updateRequest).actionGet().isFragment();
        if (!isInsertField){
            System.out.println("增加字段成功");
        }else{
            System.out.println("增加字段成功");
        }
    }

    public void insertToElastic(TransportClient client,String indexName,String type, Blog blog){
        XContentBuilder jsonBuild = null;
        XContentBuilder doc = null;

        try {
            jsonBuild = XContentFactory.jsonBuilder();
            jsonBuild.startObject().startObject("properties")
                    .startObject("sign").field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject("rowkey").field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject("id").field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject("msgId").field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject("receiveTime").field("type", "long").field("index", "not_analyzed").endObject()
                    .startObject("receiveSystem").field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject("servCode").field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject("sendSystem").field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject("resendFlag").field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject("serviceState").field("type", "Integer").field("index", "not_analyzed").endObject()
                    .startObject("connFlag").field("type", "string").field("index", "not_analyzed").endObject()
                    .endObject()
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        @SuppressWarnings("unused")
        PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(indexName).source(jsonBuild);
        try {
            doc = XContentFactory.jsonBuilder()
                    .startObject()
                    .field("sign", "0")
                    .field("rowkey", blog.getRowkey())
                    .field("id",blog.getId())
                    .field("msgId", blog.getMsgid())
                    .field("receiveTime", blog.getReceiveTime())
                    .field("receiveSystem", blog.getReceiveSystem())
                    .field("servCode", blog.getServCode())
                    .field("sendSystem", blog.getSendSystem())
                    .field("resendFlag", blog.getResendFlag())
                    .field("serviceState", blog.getServiceState())
                    .field("connFlag", blog.getConnFlag())
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }

        try {
            IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName);
            IndicesExistsResponse inExistsResponse = client.admin().indices()
                    .exists(inExistsRequest).actionGet();
            if (!inExistsResponse.isExists()) {
                client.admin().indices().prepareCreate(indexName).execute().actionGet();
            }
            client.prepareIndex(indexName, indexName).setSource(doc).execute().actionGet();
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}
